package com.bruce.tool.mq.rocket.core;

import com.bruce.tool.common.exception.BaseRuntimeException;
import com.bruce.tool.common.util.LogUtils;
import com.bruce.tool.common.util.string.JsonUtils;
import com.bruce.tool.mq.rocket.config.RocketConfig;
import com.bruce.tool.mq.rocket.constant.RocketCode;
import com.bruce.tool.mq.rocket.constant.RocketPool;
import com.bruce.tool.mq.rocket.domain.RocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 功能 :
 *
 * @author : Bruce(刘正航) 10:02 2019-02-21
 */
@Slf4j
@Component
public class RocketSender {

    @Autowired
    private RocketPool pool;

    public void execute(RocketMessage rocketMessage){
        this.execute(rocketMessage,null);
    }

    public void execute(RocketMessage rocketMessage, SendCallback callback){
        DefaultMQProducer producer = pool.getSenders().get(rocketMessage.getCode());
        if( Objects.isNull(producer) ){
            throw new BaseRuntimeException(RocketCode.MQERROR_CONFIG_NULL.getCode(),RocketCode.MQERROR_CONFIG_NULL.getMessage());
        }
        RocketConfig config = pool.getConfigs().get(rocketMessage.getCode());
        if( Objects.isNull(config) ){
            throw new BaseRuntimeException(RocketCode.MQERROR_SENDER_NULL.getCode(),RocketCode.MQERROR_SENDER_NULL.getMessage());
        }
        Message message = buildMessage(rocketMessage,config);
        if(Objects.nonNull(callback) ){
            async(producer,message,callback);
            return;
        }
        sync(producer, message);
    }

    private void sync(DefaultMQProducer producer, Message message) {
        try {
            SendResult sendResult = producer.send(message);
            // 同步发送消息，只要不抛异常就是成功
            LogUtils.debug(log, JsonUtils.objToStr(sendResult));
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            log.error("{}",e.getMessage(),e);
            throw new BaseRuntimeException(e);
        }
    }

    private void async(DefaultMQProducer producer, Message message, SendCallback callback) {
        producer.setRetryTimesWhenSendAsyncFailed(0);
        try {
            producer.send(message,callback);
        } catch (MQClientException | RemotingException | InterruptedException e) {
            throw new BaseRuntimeException(e);
        }
    }

    private Message buildMessage(RocketMessage rocketMessage, RocketConfig config) {
        Message message = new Message(
                /* Topic */
                config.getTopic() ,
                /* Tag */
                config.getTags() ,
                /* Message body */
                rocketMessage.getBody()
        );
        if(StringUtils.isNotBlank(rocketMessage.getKey())){
            message.setKeys(rocketMessage.getKey());
        }
        return message;
    }

}
